Skip to content

Feat/tool sequences#285

Open
tianmu-li wants to merge 22 commits intomlcommons:mainfrom
tianmu-li:feat/tool_sequences
Open

Feat/tool sequences#285
tianmu-li wants to merge 22 commits intomlcommons:mainfrom
tianmu-li:feat/tool_sequences

Conversation

@tianmu-li
Copy link
Copy Markdown
Collaborator

What does this PR do?

Updated multi-turn implementation for #232. Added tool sequencing, fixed scheduler for concurrent requests.

Type of change

  • Bug fix
  • New feature
  • Documentation update
  • Refactor/cleanup

Related issues

Testing

  • Tests added/updated
  • All tests pass locally
  • Manual testing completed

Checklist

  • Code follows project style
  • Pre-commit hooks pass
  • Documentation updated (if needed)

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 17, 2026

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive multi-turn conversation benchmarking framework, including a new MultiTurnScheduler, ConversationManager, and MultiTurnDataset. These additions enable benchmarking of conversational AI workloads with turn sequencing, conversation history management, and optional concurrency control. My review identified potential issues regarding the usage of sentinel objects in the scheduler and the robustness of the timeout logic in the conversation manager.

Comment thread src/inference_endpoint/load_generator/scheduler.py Outdated
f"Turn {turn} of {conv_id} timed out waiting for prev turn"
)
break # Skip remaining turns to avoid cascade timeouts
ready.put((idx, 0))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _PIPELINE_DONE sentinel is pushed to the queue, but the consumption logic in the finally block or the iterator loop does not explicitly handle it as a termination signal for individual threads, relying instead on the queue being drained. Ensure this is the intended design to avoid potential race conditions.

Comment thread src/inference_endpoint/load_generator/conversation_manager.py Outdated
Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review - will complete later.

Comment thread examples/09_MultiTurn/README.md Outdated
- Plain chat: `user → assistant → user → ...`
- Agentic: `user → assistant (with tool_calls) → tool → [tool | assistant (with tool_calls)]* → assistant → user → ...`
2. First turn must be "user" role
3. Turn numbers must be sequential (1, 2, 3, ...)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all the turns be ordered and consecutive in the dataset? For instance if i have turn 1 of all conversations followed by turn 2 of all conversations - is that a valid format. Would it not be easier to have a conversation as a top level object which contains a list of messages and associated metadata inside?
The challenge here would be reading in all the conversations to collect all the turns - unless we enforce some constraints.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all turns are now ordered and consecutive.

Comment thread examples/09_MultiTurn/README.md Outdated
Comment thread examples/09_MultiTurn/README.md Outdated
Comment thread examples/09_MultiTurn/agentic_coding_benchmark.yaml Outdated
type: performance
# Replace with the path where you saved the converted flat-row JSONL.
# Run: python scripts/convert_agentic_snapshot.py <input.jsonl> <output_flat.jsonl> --verify
path: /model/agentic_coding_flat.jsonl
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, these should be relative and follow the readme example. So if you follow the readme example, this config should work out of the box.

Comment thread examples/09_MultiTurn/agentic_workflow_benchmark.yaml
Comment thread docs/MULTI_TURN_QUICKSTART.md
Comment thread MULTI_TURN_QUICKSTART.md Outdated
Comment thread docs/MULTI_TURN_QUICKSTART.md Outdated
Comment thread MULTI_TURN_QUICKSTART.md Outdated
tianmu-li added a commit to tianmu-li/endpoints that referenced this pull request Apr 22, 2026
- Remove sequential conversation mode (redundant with target_concurrency=1)
- Remove `enabled` field from MultiTurnConfig; presence of multi_turn: block implies enabled
- Add conversation grouping validation to MultiTurnDataset (raises InputValidationError if rows for a conversation_id are not consecutive)
- Update YAML example configs: model placeholder, relative dataset paths, removed redundant metrics.collect
- Move MULTI_TURN_QUICKSTART.md to docs/
- Update all documentation to remove sequential mode references

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
tianmu-li added a commit to tianmu-li/endpoints that referenced this pull request Apr 22, 2026
…wer comments

- Remove dead constant BLOCK_ON_PREVIOUS_TURN = -1 from scheduler.py
- Remove redundant outer with state.condition: in mark_turn_complete
- Remove ConversationMode import and explicit mode= args from integration tests
- Fix format: jsonl → format: ".jsonl" in example YAMLs and docs
- Add target_concurrency: 1 clarification to quickstart (preserves turn ordering)
- Remove broken HYBRID_SCHEDULER_GUIDE.md reference from quickstart

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tianmu-li tianmu-li force-pushed the feat/tool_sequences branch from d8dfd64 to b127845 Compare April 25, 2026 22:53
tianmu-li and others added 7 commits April 27, 2026 11:44
Add MultiTurnDataset, MultiTurnConfig schema, tool-calling types,
Query.metadata transport field, adapter tools= kwarg, and multi-turn
factory routing.
Add per-conversation asyncio.Event sequencing (ConversationManager),
async turn pipeline (MultiTurnStrategy), and benchmark execution wiring
(execute.py, session.py PhaseIssuer data_override).
Add unit tests for MultiTurnDataset, ConversationManager, and
MultiTurnStrategy; add integration tests including tool-use scenarios
and large-concurrency stress tests.
Consolidate multi-turn dataset with single-turn transform pipeline,
fix prior-row extraction, live-history mode, system prompt injection,
tool_calls preservation, and asyncio.Event-based sequencing.
Add MULTI_TURN_QUICKSTART.md, examples/09_MultiTurn/ configs and sample
data, scripts/convert_agentic_snapshot.py, and README clarifications
including conversion script output destination.
…ring

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…efault path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tianmu-li tianmu-li force-pushed the feat/tool_sequences branch from b127845 to 0a7ad37 Compare April 27, 2026 20:09
@arekay-nv
Copy link
Copy Markdown
Collaborator

arekay-nv commented Apr 27, 2026

⚠️ Superseded. This comment was posted while a pending review was blocking inline-comment delivery. The review has since been re-posted with all 14 inline comments attached: see review #4184417968 and the updated summary. The findings are unchanged.

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Council — Multi-AI Code Review

Reviewed by: Claude (Codex review failed with a CLI config error — invalid 'features' requirement 'browser_use' from cloud requirements) | Depth: thorough

Found 15 issues (0 critical, 2 high, 5 medium, 8 low). 14 posted as inline comments, 1 in summary table only (line outside diff hunk).

id=result_id or response.id,
response_output=TextModelOutput(output=response.choices[0].message.content),
response_output=TextModelOutput(output=choice.message.content or ""),
metadata=metadata if metadata else None,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 HIGH · bug · [Claude]

from_endpoint_response violates the QueryResult.metadata type contract by passing metadata=None when no tool_calls/finish_reason/reasoning_content are present. QueryResult.metadata is typed dict[str, Any] with default_factory=dict. The constructed result is then passed to worker._handle_non_streaming_body which calls result.with_metadata(req.query_metadata). with_metadata short-circuits on empty additional_metadata (current callers pass {}), but as soon as any caller routes a non-empty Query.metadata (e.g., the conversation_id round-trip explicitly mentioned in Query.metadata's docstring), dict(self.metadata) will raise TypeError: 'NoneType' object is not iterable. Likewise, any code that does result.metadata.get(...) on this result will raise AttributeError. Fix:

return QueryResult(
    id=result_id or response.id,
    response_output=TextModelOutput(output=choice.message.content or ""),
    metadata=metadata,  # always pass dict; let omit_defaults handle empty
)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. metadata initialized to empty dict

for conv_id, turns in conv_samples.items()
]

await asyncio.gather(*tasks, return_exceptions=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 HIGH · error-handling · [Claude]

await asyncio.gather(*tasks, return_exceptions=True) silently swallows every exception raised by a _conv_pipeline coroutine. The result list is discarded — there is no for r in results: if isinstance(r, Exception): .... Programmer bugs (KeyError on missing metadata key, AttributeError on result.metadata, dataset metadata-format drift, etc.) become invisible: the strategy returns phase_issuer.issued_count as if everything succeeded, and the only sign of failure is missing samples. AGENTS.md explicitly flags swallowed exceptions as forbidden. Fix: collect the result list and re-raise the first non-CancelledError exception, or at least log each one before discarding:

results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
    if isinstance(r, BaseException) and not isinstance(r, asyncio.CancelledError):
        logger.error("conv_pipeline crashed: %r", r, exc_info=r)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Errors are collected and logged.

logger.warning(
f"Turn {turn} of {conv_id} timed out waiting for previous turn"
)
state.failed_turns += 1
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MEDIUM · bug · [Claude]

When a turn times out waiting for the previous turn's response, the pipeline does state.failed_turns += 1 but never increments state.completed_turns, then breaks out of the loop. As a result, ConversationState.is_complete() (completed_turns >= expected_client_turns) returns False forever, the conversation never logs completion, and downstream _log_if_complete reporting silently drops it. Worse, if the late response arrives later, mark_turn_complete runs on the abandoned state and bumps completed_turns once, leaving completed_turns=1, failed_turns=1 for what was a multi-turn conversation, with the remaining turns never issued — but no error is raised. Fix: bump both counters on timeout (or call self._conv_manager.mark_turn_failed(...)), so the failure is properly accounted, and consider also draining/discarding the now-orphan _inflight entries for this conversation so a late response doesn't mutate state for a dead pipeline.

except TimeoutError:
    logger.warning(...)
    state.completed_turns += 1
    state.failed_turns += 1
    break

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Timeout calls mark_turn_failed() which bumps the counters.

query_id = uuid.uuid4().hex
data = self._dataset.load_sample(sample_index)
if data_override is not None:
data = {**data, **data_override}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MEDIUM · data-integrity · [Claude]

PromptData.text = data.get("prompt") and token_ids = data.get("input_tokens") or data.get("token_ids") — neither is set on multi-turn samples. MultiTurnDataset.load() writes the conversation into sample["messages"] (a list of message dicts) and does not populate prompt, input_tokens, or token_ids. The result is that every multi-turn ISSUED event records PromptData(text=None, token_ids=None), so the metrics aggregator computes ISL from zero or just the bare current-turn user text instead of the full prompt-with-history actually sent over the wire. ISL/throughput/TPOT-derived numbers reported for any multi-turn benchmark will therefore be wrong. Either serialize the merged messages into a synthetic prompt for ISL accounting, or extend PromptData to carry the OpenAI messages list and have the aggregator handle it. At minimum, document the limitation prominently and emit a one-shot warning when running multi-turn without a tokenizer.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Contents are jointed into prompt_text

if val and isinstance(val, str):
system_content = val
break
system_prompts_by_conv[str(conv_id)] = system_content
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MEDIUM · bug · [Claude]

system_prompts_by_conv is keyed by str(conv_id) (line 212), but pre_built_messages_by_key and current_turn_messages_by_key are keyed by the raw conv_id returned from groupby (lines 266–267), and samples stores the raw value too. MultiTurnStrategy.execute() then looks up sys_prompts.get(conv_id) with the raw value (multi_turn_strategy.py line 116), so the system prompt is silently dropped whenever a JSONL has a non-string conversation_id (integer ids, mixed types after pandas type inference, etc.). Pick one canonicalization (preferably str(conv_id)) and apply it consistently across all dictionaries — including the samples entries and the _conv_states/_inflight keys in the strategy.

Copy link
Copy Markdown
Collaborator Author

@tianmu-li tianmu-li Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. All keys now use str(conv_id)

messages.append({"role": "system", "content": system_content})

# All dataset rows strictly before this client turn (includes
# assistant rows and prior tool results).
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 LOW · bug · [Claude]

When iterating prior_rows to build pre-built messages, a tool row whose tool_results field is an empty list (e.g. tool_results: []) is collected into msg, then _expand_tool_results(msg) returns [] (line 244 falls through), and the bare malformed msg (role='tool', no tool_call_id, no content, just the empty tool_results) is appended to the message history at line 246. This produces an OpenAI-invalid tool message. Either skip rows with empty tool_results entirely, or treat an empty list as a validation error in _validate_conversation_structure.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Empty tool results return []


@pytest.mark.integration
@pytest.mark.asyncio
async def test_tools_field_forwarded_to_endpoint(echo_server):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 LOW · testing · [Claude]

The "tool use" integration tests verify that tools, tool_calls, and tool_results are forwarded to the endpoint and that all client turns are issued, but they only run against an EchoServer that does not validate tool_call_id pairing or generate fresh tool_call ids. As a result:

  • The live-history tool_call_id mismatch issue (see related issue) cannot be detected by these tests.
  • There is no test covering pipeline error propagation (today, a crash inside _conv_pipeline is silently swallowed by gather(return_exceptions=True)).
  • There is no concurrent-conversation stress test (>10 conversations under a non-trivial target_concurrency) that exercises the semaphore + timeout interaction.

Add at least: (1) a test using a fake server that asserts each tool message's tool_call_id matches a prior assistant tool_calls id under live-history mode (currently expected to fail), (2) a test asserting that an exception in a pipeline propagates out of MultiTurnStrategy.execute, and (3) a stress test with 50+ concurrent conversations and target_concurrency set, asserting all complete and the semaphore reaches its rest value.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test with concurrent conversations and multiple turns.

"tool": {"assistant", "user"},
}

for conv_id, group in self.dataframe.groupby("conversation_id"):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 LOW · performance · [Claude]

_validate_conversation_structure, _validate_turn_numbering, and _build_metadata each call self.dataframe.groupby("conversation_id") independently (lines 142, 164, 201, 280). For a dataset with thousands of conversations, these are repeated O(N log N) operations on top of a to_dict(orient="records") walk in _validate_conversation_grouping. Cache the groupby once at the top of __init__ and pass the grouped object into each helper. This is dataset-load-time only (cold path), so it's a low-priority cleanup, but the redundancy is currently visible in the code.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

"""

conversation_id: str
turn_done: asyncio.Event = field(default_factory=asyncio.Event)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 LOW · concurrency · [Claude]

turn_done: asyncio.Event = field(default_factory=asyncio.Event) is constructed at ConversationState instantiation time, before MultiTurnStrategy.execute runs. In Python 3.10+ this no longer requires a running event loop at construction time, but the resulting Event is still bound to whichever loop first awaits/sets it. MultiTurnStrategy.__init__ similarly constructs asyncio.Semaphore(target_concurrency) (multi_turn_strategy.py line 84). Today _run_benchmark_async happens to construct both inside the eventual benchmark loop, but that's fragile — anyone who instantiates a MultiTurnStrategy outside the loop (tests, CLI helpers, future REPL) will silently get cross-loop primitives that hang on wait(). Defer creation to execute() (where the running loop is guaranteed to exist) or assert asyncio.get_running_loop() in __init__.

)

# Maps query_id -> conversation_id for routing completions.
self._inflight: dict[str, str] = {}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 LOW · design · [Claude]

self._inflight: dict[str, str] = {} is mutated from two execution contexts (the per-conversation pipeline tasks at line 190, and on_sample_complete invoked from the recv task at line 218) and is never bounded. While Python's GIL makes the dict ops atomic and single-threaded asyncio prevents true concurrent access, every entry that doesn't get a response (timeouts, dropped responses on shutdown, etc.) leaks for the lifetime of the strategy. For a long-running benchmark with many timeouts, this is a silent memory/diagnosability issue. Consider clearing _inflight entries explicitly in the timeout branch (line 165) and on session-stop, and add a debug-time assertion that _inflight is empty when execute returns.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@arekay-nv
Copy link
Copy Markdown
Collaborator

Review Council — Multi-AI Code Review

Reviewed by: Claude | Depth: thorough | Commit: 0a7ad37

14 inline comments posted in review #4184417968. 1 finding (openai_adapter.py:131) is summary-only because the line falls outside the diff hunk.

Found 15 issues (0 critical, 2 high, 5 medium, 8 low). Codex review failed with a CLI config error (invalid features requirement browser_use from cloud requirements) — Claude-only review.

🟠 Must Fix (high)

Issues that will cause incorrect behavior users will hit in normal usage.

# Location Category Summary
1 src/inference_endpoint/openai/openai_msgspec_adapter.py:222 bug from_endpoint_response violates the QueryResult.metadata type contract by passing metadata=None when no…
2 src/inference_endpoint/load_generator/multi_turn_strategy.py:137 error-handling await asyncio.gather(*tasks, return_exceptions=True) silently swallows every exception raised by a _conv_pipeline coroutine. The result…

🟡 Should Fix (medium)

Real issues that trigger under specific conditions, or design flaws that will compound.

# Location Category Summary
3 src/inference_endpoint/load_generator/multi_turn_strategy.py:164 bug When a turn times out waiting for the previous turn's response, the pipeline does state.failed_turns += 1 but never increments…
4 src/inference_endpoint/load_generator/session.py:206 data-integrity PromptData.text = data.get("prompt") and token_ids = data.get("input_tokens") or data.get("token_ids") — neither is set on multi-turn…
5 src/inference_endpoint/dataset_manager/multi_turn_dataset.py:212 bug system_prompts_by_conv is keyed by str(conv_id) (line 212), but pre_built_messages_by_key and current_turn_messages_by_key are…
6 src/inference_endpoint/load_generator/multi_turn_strategy.py:180 bug In live-history mode (use_dataset_history=False), the per-turn tool message reuses the dataset's hardcoded tool_call_id (e.g.…
7 src/inference_endpoint/load_generator/conversation_manager.py:142 error-handling mark_turn_complete and mark_turn_failed raise KeyError if conversation_id is missing. These are invoked from…

🔵 Consider (low)

Valid improvements that could be follow-ups.

# Location Category Summary
8 src/inference_endpoint/config/runtime_settings.py:200 design self.load_pattern.type.value == "multi_turn" compares the enum value as a string literal instead of the enum:…
9 src/inference_endpoint/config/schema.py:253 design MultiTurnConfig uses model_config = {"extra": "forbid"} (raw dict) and is missing frozen=True, while every other config model in this…
10 src/inference_endpoint/dataset_manager/multi_turn_dataset.py:222 bug When iterating prior_rows to build pre-built messages, a tool row whose tool_results field is an empty list (e.g. tool_results: [])…
11 tests/integration/test_multi_turn.py:559 testing The "tool use" integration tests verify that tools, tool_calls, and tool_results are forwarded to the endpoint and that all client…
12 src/inference_endpoint/dataset_manager/multi_turn_dataset.py:142 performance _validate_conversation_structure, _validate_turn_numbering, and _build_metadata each call self.dataframe.groupby("conversation_id")
13 src/inference_endpoint/load_generator/conversation_manager.py:45 concurrency turn_done: asyncio.Event = field(default_factory=asyncio.Event) is constructed at ConversationState instantiation time, before…
14 src/inference_endpoint/load_generator/multi_turn_strategy.py:95 design self._inflight: dict[str, str] = {} is mutated from two execution contexts (the per-conversation pipeline tasks at line 190, and…
15 src/inference_endpoint/openai/openai_adapter.py:131 (summary-only) api-contract OpenAIAdapter.from_endpoint_response (the non-msgspec adapter, used when callers explicitly select it) does not extract tool_calls,…

tianmu-li and others added 2 commits April 28, 2026 10:37
…tation

Fix 15 review issues across severity levels:
- HIGH: metadata=None crash in msgspec adapter, silent exception swallowing in gather
- MEDIUM: timeout state consistency, conv_id canonicalization, PromptData fallback, conv_id guard
- LOW: enum comparison, frozen config, empty tool_results warning, adapter metadata extraction,
  groupby deduplication, live-history tool warning, asyncio.Event docs, test TODO

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use newline separators (instead of spaces) when flattening messages to
text for ISL estimation, and add a 12-conversation concurrent stress test.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tianmu-li
Copy link
Copy Markdown
Collaborator Author

Hi @arekay-nv, I've addressed the comments. Appreciate it if you could take another look.

@tianmu-li tianmu-li marked this pull request as ready for review May 1, 2026 19:16
@tianmu-li tianmu-li requested review from a team and Copilot May 1, 2026 19:16
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the benchmarking system to support multi-turn conversational workloads (including tool-calling sequences), adds a dedicated multi-turn dataset format + conversion/validation tooling, and wires a new multi-turn load strategy into the benchmarking session and OpenAI adapters.

Changes:

  • Add MultiTurnDataset (flat-row JSONL format) with validation, metadata precomputation, and adapter-default handling for per-turn parameters/tools.
  • Add MultiTurnStrategy + ConversationManager to enforce per-conversation turn sequencing with optional global concurrency limiting, and integrate it into BenchmarkSession.
  • Extend OpenAI request/response handling for messages, tools, tool-call metadata, and streaming tool-call accumulation; add extensive unit/integration tests and multi-turn docs/examples/scripts.

Reviewed changes

Copilot reviewed 43 out of 44 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/unit/openai/test_openai_adapter.py New unit tests for OpenAIAdapter tool serialization and tools forwarding.
tests/unit/openai/test_msgspec_adapter.py New unit tests for msgspec OpenAI adapter tool-call fields and message dict conversion.
tests/unit/load_generator/test_multi_turn_strategy.py New unit tests for turn sequencing, concurrency semaphore behavior, and metadata propagation.
tests/unit/load_generator/test_multi_turn_conversation_manager.py New unit tests for conversation state bookkeeping and event gating.
tests/unit/dataset_manager/test_transforms.py Add coverage for the new AddDefaultColumns transform.
tests/unit/dataset_manager/test_multi_turn_dataset.py Comprehensive tests for MultiTurnDataset, including tool sequences and metadata correctness.
tests/unit/core/test_types.py Add tests for QueryResult.with_metadata() and Query.metadata round-tripping.
tests/unit/config/test_schema.py Add tests for multi-turn config validation and multi-turn sample counting logic.
tests/integration/test_multi_turn.py End-to-end integration tests exercising dataset-history/live-history modes and tool-use conversations.
src/inference_endpoint/openai/types.py Extend msgspec OpenAI types to include tool_calls, tool_call_id, and tools.
src/inference_endpoint/openai/openai_msgspec_adapter.py Support messages input, tool-call fields, tools forwarding, and richer response metadata.
src/inference_endpoint/openai/openai_adapter.py Support messages input, tools forwarding, and return richer response metadata.
src/inference_endpoint/openai/accumulator.py Accumulate streamed tool_calls + finish_reason into final QueryResult.metadata.
src/inference_endpoint/load_generator/strategy.py Extend PhaseIssuerProtocol.issue() to accept an optional data_override.
src/inference_endpoint/load_generator/session.py Allow injecting a per-phase strategy; support data overrides in sample issuance.
src/inference_endpoint/load_generator/multi_turn_strategy.py New multi-turn strategy implementing per-conversation sequencing + global concurrency limiting.
src/inference_endpoint/load_generator/conversation_manager.py New synchronous conversation state manager used by multi-turn strategy.
src/inference_endpoint/endpoint_client/worker.py Propagate Query.metadata through requests and merge into results.
src/inference_endpoint/endpoint_client/http.py Add query_metadata field to InFlightRequest.
src/inference_endpoint/endpoint_client/adapter_protocol.py Generalize SSE decoding/parse APIs to return adapter-specific chunk objects.
src/inference_endpoint/dataset_manager/transforms.py Add AddDefaultColumns (fill-missing-only) transform.
src/inference_endpoint/dataset_manager/multi_turn_dataset.py New multi-turn dataset implementation with tool-sequence handling and metadata building.
src/inference_endpoint/dataset_manager/factory.py Select MultiTurnDataset when dataset config includes multi_turn; skip prompt-based transforms for it.
src/inference_endpoint/dataset_manager/init.py Export MultiTurnDataset and AddDefaultColumns.
src/inference_endpoint/core/types.py Add Query.metadata and QueryResult.with_metadata().
src/inference_endpoint/config/templates/online_template_full.yaml Expose multi_turn dataset block and multi_turn load pattern option in template.
src/inference_endpoint/config/templates/online_template.yaml Expose multi_turn load pattern option in template.
src/inference_endpoint/config/templates/offline_template_full.yaml Expose multi_turn dataset block and load pattern option in template.
src/inference_endpoint/config/templates/concurrency_template_full.yaml Expose multi_turn dataset block and load pattern option in template.
src/inference_endpoint/config/templates/concurrency_template.yaml Expose multi_turn load pattern option in template.
src/inference_endpoint/config/schema.py Add multi-turn schema objects and cross-validate dataset.multi_turn ↔ load_pattern.type.
src/inference_endpoint/config/runtime_settings.py Make multi-turn sample count issue all dataset client turns (min-sample-count aware).
src/inference_endpoint/commands/benchmark/execute.py Instantiate and wire MultiTurnStrategy automatically when using MultiTurnDataset.
scripts/validate_jsonl_schema.py New CLI script to validate multi-turn JSONL rows against schema.
scripts/multi_turn_dataset_schema.json New JSON Schema for multi-turn flat-row JSONL datasets.
scripts/convert_agentic_snapshot.py New conversion+verification script from snapshot-style agentic datasets to flat-row JSONL.
examples/09_MultiTurn/multi_turn_with_concurrency.yaml Example config: multi-turn with global concurrency limiting.
examples/09_MultiTurn/multi_turn_benchmark.yaml Example config: basic multi-turn benchmark.
examples/09_MultiTurn/datasets/.gitkeep Placeholder for converted example datasets.
examples/09_MultiTurn/customer_support_conversations.jsonl Example multi-turn dataset.
examples/09_MultiTurn/agentic_workflow_benchmark.yaml Example config for converted agentic workflow dataset.
examples/09_MultiTurn/agentic_coding_benchmark.yaml Example config for converted agentic coding dataset.
examples/09_MultiTurn/README.md Multi-turn feature documentation and agentic conversion guidance.
docs/MULTI_TURN_QUICKSTART.md Quickstart guide for running multi-turn benchmarks.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +275 to +281
samples.append(
{
"index": idx,
"conversation_id": str_conv_id,
"turn": t_n,
}
)
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_build_metadata() currently appends sample entries with "index": idx where idx is the DataFrame row index from iterrows(). That index is not guaranteed to be a dense 0..N-1 mapping to Dataset.load_sample() indices (especially after filtering to client turns). To avoid incorrect sample issuance, store a dense sample_index that matches the position in self.data (client_turn_samples) and have the scheduler use that for PhaseIssuer.issue().

Copilot uses AI. Check for mistakes.
Comment on lines +108 to +112
conv_samples: dict[str, list[tuple[int, int]]] = defaultdict(list)
for sample_index, sample_meta in enumerate(self._dataset_metadata["samples"]):
conv_id = sample_meta["conversation_id"]
conv_samples[conv_id].append((sample_index, sample_meta["turn"]))

Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute() uses enumerate(dataset_metadata["samples"]) to derive sample_index for phase_issuer.issue(). This assumes the metadata list order exactly matches the dataset's load_sample() indexing. With MultiTurnDataset, that coupling is fragile (and can break if metadata ordering differs from self.data). Prefer reading an explicit sample_index field from metadata (or another stable mapping) that is guaranteed to refer to the dataset sample index.

Copilot uses AI. Check for mistakes.
Comment thread src/inference_endpoint/openai/openai_adapter.py
Comment on lines +54 to +63
return [
{
"role": "tool",
"tool_call_id": result.get("tool_call_id"),
"content": result.get("content"),
}
for result in tool_results
]


Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_expand_tool_results() will emit tool messages even if a tool_results entry is missing tool_call_id or content (they become None). That produces invalid OpenAI wire-format messages and will likely fail downstream. Since tool rows are required to have non-empty tool_results with required fields (per schema/docs), consider validating each entry and raising InputValidationError when required keys are absent.

Suggested change
return [
{
"role": "tool",
"tool_call_id": result.get("tool_call_id"),
"content": result.get("content"),
}
for result in tool_results
]
expanded_results: list[dict] = []
for index, result in enumerate(tool_results):
if not isinstance(result, dict):
raise InputValidationError(
"Each tool_results entry must be an object "
f"(conversation_id={row.get('conversation_id')}, turn={row.get('turn')}, "
f"tool_result_index={index})."
)
if "tool_call_id" not in result or result["tool_call_id"] is None:
raise InputValidationError(
"Each tool_results entry must include a non-null 'tool_call_id' "
f"(conversation_id={row.get('conversation_id')}, turn={row.get('turn')}, "
f"tool_result_index={index})."
)
if "content" not in result or result["content"] is None:
raise InputValidationError(
"Each tool_results entry must include non-null 'content' "
f"(conversation_id={row.get('conversation_id')}, turn={row.get('turn')}, "
f"tool_result_index={index})."
)
expanded_results.append(
{
"role": "tool",
"tool_call_id": result["tool_call_id"],
"content": result["content"],
}
)
return expanded_results

Copilot uses AI. Check for mistakes.
Comment on lines +106 to +112
super().__init__(dataframe, **kwargs)
assert self.dataframe is not None, "Dataframe must be initialized"
self._conv_groups = dict(list(self.dataframe.groupby("conversation_id")))
self._validate_conversation_grouping()
self._validate_conversation_structure()
self._validate_turn_numbering()
self.conversation_metadata = self._build_metadata()
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MultiTurnDataset builds self._conv_groups via dataframe.groupby("conversation_id"), which defaults to sorting keys. That can reorder conversations compared to file order, while load() later builds self.data in raw row order. If conversation_ids are not already lexicographically sorted, conversation_metadata["samples"] ordering can diverge from load_sample() indices, causing MultiTurnStrategy to issue the wrong samples/turns. Consider using groupby(..., sort=False) and/or explicitly constructing a dense sample_index in file order that is guaranteed to match self.data indices.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some confusion around the issue policy, lets sync today.

Comment thread docs/MULTI_TURN_QUICKSTART.md Outdated
Comment on lines +155 to +159
**Sizing guide**:

- Small (< 50 convs): `target_concurrency: 32`
- Medium (50-500 convs): `target_concurrency: 64`
- Large (500+ convs): `target_concurrency: 96` or higher
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the metrics for these suggestions? For Small (< 50 convs) does a concurrency of 32 give better performance? I would assume the model size/server capability also plays a role in the concurrency settings.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this part. Only keeping concurrency definition.

Comment thread docs/MULTI_TURN_QUICKSTART.md Outdated
workers: 16 # More workers for parallel conversations
```

### Long Conversations
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarify with "conversations with large number of turns" - a long conversation can also mean small number of long turns as end to end time could also be large there.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Removed this exact phrasing.

Comment on lines +216 to +218
**Problem**: Your dataset doesn't follow a valid role sequence.

**Fix**: Check your JSONL. Valid sequences:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a utility that parses the dataset to make sure it is compliant so devs can use it instead of running the benchmark for testing.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added documentation to use scripts/validate_jsonl_schema.py for validation

)

# Maps query_id -> conversation_id for routing completions.
self._inflight: dict[str, str] = {}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines +129 to +135
tasks = [
asyncio.create_task(
self._conv_pipeline(conv_id, turns, phase_issuer),
name=f"mt-pipeline-{conv_id}",
)
for conv_id, turns in conv_samples.items()
]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one task for each conversation?

Comment on lines +110 to +127
def test_encode_request_produces_valid_json_bytes():
"""encode_request returns bytes that msgspec can decode back."""
messages = [{"role": "user", "content": "Hello"}]
query = Query(
id="q2",
data={
"model": "m",
"messages": messages,
"max_completion_tokens": 64,
"stream": False,
},
)
request = OpenAIAdapter.to_endpoint_request(query)
encoded = OpenAIAdapter.encode_request(request)

assert isinstance(encoded, bytes)
decoded = msgspec.json.decode(encoded)
assert decoded["messages"][0]["role"] == "user"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the multi-turn testing?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, more specifically for tool call handling

Comment on lines +180 to +182
# Acquire concurrency slot before issuing.
if self._sem is not None:
await self._sem.acquire()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that needs fixing/clarification. As-is, this would mean that we have multiple conversations open, with N of them (N being the concurrency) sending requests. If i understand correctly, we only want to have N conversations open, with a turn from each currently being processed.
Launching tasks for each conversation will have this drawback - we cannot schedule tasks from a subset of conversations as all conversations will be available to run.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to advance turn on on_sample_complete. Only N conversations are kept open, and a new conversation only starts when an existing one finishes.

Comment on lines +229 to +230
if self._sem is not None:
self._sem.release()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to issue turns from a conversation before moving onto a new conversation, this needs to move to the sample_complete request.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored. Same as above



@dataclass(frozen=True, slots=True)
@dataclass(frozen=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

tianmu-li and others added 4 commits May 4, 2026 11:09
…Strategy

target_concurrency now limits active conversations (not in-flight requests).
N worker tasks pull from asyncio.Queue, each processing one full conversation
before taking the next. Also adds slots=True back to PhaseConfig and sort=False
to groupby for file-order preservation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n implementation

- openai_adapter: normalize null content to "" instead of literal "None"
  to avoid polluting conversation history in tool-calling responses
- multi_turn_dataset: validate tool_results entries have required
  tool_call_id and content fields; raise InputValidationError at load time
- multi_turn_dataset: remove unused "index" field from samples metadata
- multi_turn_strategy: wrap mark_turn_complete/mark_turn_failed in
  try/except KeyError in on_sample_complete
- multi_turn_strategy: clear _inflight at end of execute() with warning
  if entries remain (transport failure or session abort)
- docs: remove prescriptive concurrency sizing guide; replace with
  definition of what target_concurrency controls
- docs: rename "Long Conversations" to "Conversations with Many Turns"
- docs: add dataset validation utility reference in Troubleshooting

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fix refusal field set to literal string "None" instead of "" in
  openai_adapter.py — made downstream refusal checks incorrectly truthy
- Add test_pipeline_error_propagated to verify execute() re-raises
  worker exceptions instead of swallowing them via gather(return_exceptions=True)
- Clarify MultiTurnStrategy docstring and MULTI_TURN_QUICKSTART.md:
  target_concurrency = simultaneous conversations (not requests);
  each active conversation has exactly 1 in-flight turn at a time
- Remove unjustified "Common Configurations" section from quickstart
- Correct misleading "workers = concurrent conversations" tip; clarify
  client.workers and target_concurrency are independent layers

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ategy

Rewrites MultiTurnStrategy to issue subsequent turns synchronously inside
on_sample_complete() (zero event-loop delay), removing pre-spawned worker
tasks and per-conversation asyncio.Event waiting. ConversationState no
longer holds an asyncio.Event; sequencing is driven entirely by the
strategy. Addresses PR mlcommons#285 reviewer request to move turn issuance into
the sample-complete handler.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 4, 2026 22:17
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 43 out of 44 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (2)

src/inference_endpoint/endpoint_client/adapter_protocol.py:134

  • parse_sse_chunk now appends whatever decode_sse_message returns, including None (e.g., when an SSE message has no choices). This contradicts the docstring (“Silently ignores non-content SSE messages”) and forces downstream accumulators to defensively handle None. Consider filtering out None return values here (and/or handling exceptions per JSON doc) so call sites only see meaningful chunk objects.
    def parse_sse_chunk(cls, buffer: bytes, end_pos: int) -> list[Any]:
        """
        Parse SSE chunk and extract all chunk objects.

        Extracts JSON documents from SSE stream and decodes them to chunk objects.
        Silently ignores non-content SSE messages (role, finish_reason, etc).

        Args:
            buffer: Byte buffer containing SSE data
            end_pos: End position in buffer to parse up to

        Returns:
            List of chunk objects extracted from the SSE chunk
        """
        json_docs = cls.SSE_DATA_PATTERN.findall(buffer[:end_pos])
        parsed_contents = []

        try:
            for json_doc in json_docs:
                content = cls.decode_sse_message(json_doc)
                parsed_contents.append(content)
        except Exception:
            # Normal for non-content SSE messages (role, finish_reason, etc)
            pass

        return parsed_contents

src/inference_endpoint/core/types.py:242

  • The Query docstring’s gc=False note only mentions data/headers, but Query now also has a mutable metadata dict. To avoid future misuse (and to match the more explicit QueryResult guidance), consider updating this note to include metadata as well and/or adding an AT-RISK (gc=False) warning that data/metadata/headers must not be mutated to introduce cycles.
    Attributes:
        id: Unique identifier for this query (auto-generated UUID).
        data: Request payload as a dictionary (typically contains prompt, model, etc.).
        metadata: Internal metadata that round-trips through transport (e.g., conversation_id).
        headers: HTTP headers to include in the request (e.g., authorization).
        created_at: Timestamp when query was created (seconds since epoch).

    Example:
        >>> query = Query(
        ...     data={"prompt": "Hello", "model": "Qwen/Qwen3-8B", "max_tokens": 100},
        ...     headers={"Authorization": "Bearer token123"},
        ... )

    Note:
        gc=False: Safe because data/headers are simple key-value pairs without cycles.
        Do NOT store self-referential or cyclic structures in data/headers fields.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 44 to 50
class SSEDelta(msgspec.Struct, frozen=True, kw_only=True, omit_defaults=True, gc=False): # type: ignore[call-arg]
"""SSE delta object containing content."""

content: str = ""
reasoning: str = ""
tool_calls: list[dict[str, Any]] | None = None

raise ValueError(
f"Conversation {conv_id} has invalid role sequence at turn "
f"{row['turn']}: got '{role}' after state '{state}'"
)
@tianmu-li
Copy link
Copy Markdown
Collaborator Author

Hi @arekay-nv, I've addressed the comments. Main change is refactoring to an event-based model and limiting number of active conversations to concurrency. Appreciate it if you could take another look.

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there - the only issue is the stickiness of the client-turns.

Comment on lines +548 to +552
def _on_sample_complete(result: QueryResult) -> None:
if multi_turn_strategy is not None:
multi_turn_strategy.on_sample_complete(result)
collector.on_complete_hook(result)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create two versions of this and pass it to benchmark session conditionally. This way, we are not checking the strategy on each sample-complete event.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. The check is now only applied to multi-turn

Comment thread src/inference_endpoint/config/schema.py Outdated
Comment on lines +68 to +73
class ConversationMode(str, Enum):
"""Multi-turn conversation scheduling modes."""

INDEPENDENT = "independent" # Per-conv pipelines; no cross-conv turn barrier


Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? Can we remove it until we have two different modes.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed all references to independent scheduler.

Comment on lines +130 to +153
class AddDefaultColumns(Transform):
"""Add columns only where values are missing (NaN or absent).

Unlike AddStaticColumns which unconditionally overwrites, this preserves
existing non-null values — dataset per-row overrides take precedence over
the supplied defaults.
"""

def __init__(self, data: dict[str, Any]):
"""Initialize the AddDefaultColumns transform."""
self.data = data

def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Fill missing columns with defaults without overwriting existing values."""
for key, value in self.data.items():
if value is None:
continue
if key in df.columns:
df[key] = df[key].where(pd.notna(df[key]), value)
else:
df[key] = value
return df


Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be implemented by using AddStaticColumn and a boolean indicating whether to overwrite or not?

Copy link
Copy Markdown
Collaborator Author

@tianmu-li tianmu-li May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Removed AddDefaultColumn and all references to it.

Comment thread src/inference_endpoint/core/types.py Outdated
else:
return "<EMPTY>"

def with_metadata(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed for anything other than conversation id? Can we use the query-id for this instead of having to wire the metadata through the whole rount-trip.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed. Removed

Comment thread tests/unit/config/test_schema.py Outdated
@pytest.mark.unit
def test_multi_turn_valid_config(self):
config = BenchmarkConfig(**self._make_online_multi_turn(concurrency=16))
from inference_endpoint.config.schema import LoadPatternType
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move import to top

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment thread tests/unit/config/test_schema.py Outdated

@pytest.mark.unit
def test_multi_turn_uses_dataset_size_ignoring_duration(self):
from inference_endpoint.config.runtime_settings import RuntimeSettings
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move import to top.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@arekay-nv
Copy link
Copy Markdown
Collaborator

@viraatc can you check the perf implications - there might be non-negligible overhead for non-agentic/multi-turn workloads.

tianmu-li and others added 3 commits May 6, 2026 16:32
…tation

- Remove ConversationMode enum (single-member) and mode field from
  MultiTurnConfig; drop mode: independent from YAML examples and docs
- Merge AddDefaultColumns into AddStaticColumns(overwrite=False)
- Replace per-call strategy check with construct-time branch in execute.py
- Normalize None tool-calling content to "" in openai_adapter.py
- Delete unused Query.metadata, QueryResult.with_metadata, and
  InFlightRequest.query_metadata plumbing
- Add role-specific validation in _validate_conversation_structure:
  tool rows require non-empty tool_results, assistant rows require
  content or tool_calls
- Backfill explicit sample_index into conversation_metadata["samples"];
  MultiTurnStrategy reads sample_meta["sample_index"] instead of enumerate
- Add AT-RISK gc=False docstring notes to openai/types.py structs with
  mutable container fields
- Rewrite dataset tool_call_ids with model-generated ids in live-history
  mode; add test_live_history_remaps_tool_call_id integration test
- Lift inline imports to top of test_schema.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Li, Tianmu <tianmu.li@intel.com>
Remove tool_call_id rewriting from live-history mode (last_assistant_tool_call_ids
field, ConversationManager population, MultiTurnStrategy rewrite logic) and the
corresponding integration test. Live-history improvements are not in scope for
this PR. Also revert the _mt_strategy closure capture in execute.py that was not
requested by any review comment, while keeping the is-None branch elimination.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 6, 2026 23:38
Signed-off-by: Li, Tianmu <tianmu.li@intel.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 43 out of 44 changed files in this pull request and generated 6 comments.

Comment on lines +207 to +212
logger.warning(
"Live-history mode with tool messages uses dataset "
"tool_call_ids; real endpoint IDs will differ "
"(conv=%s, turn=%d)",
conv_id,
turn,
Comment thread docs/MULTI_TURN_QUICKSTART.md Outdated
Comment on lines +138 to +139
This catches missing required fields, invalid role sequences, non-consecutive turn numbers, and
interleaved conversations — all errors that would otherwise surface at benchmark startup.
Comment thread examples/09_MultiTurn/README.md Outdated
Comment on lines +107 to +119
| Row role | Extra fields |
| -------------------------------- | ------------------------------------------------------------------ |
| `assistant` with tool calls | `tool_calls: [{id, type, function: {name, arguments}}]` |
| `tool` single result | `tool_call_id: <str>`, `content: <str>` |
| `tool` parallel results (merged) | `tool_results: [{tool_call_id, content}, ...]` |
| `user` or `tool` turns | `tools: [...]` (OpenAI tool definitions forwarded to the endpoint) |

Example rows from a converted agentic dataset:

```jsonl
{"conversation_id": "sim_001", "turn": 1, "role": "user", "content": "Fix the bug in foo.py", "system": "You are a coding agent.", "tools": [...]}
{"conversation_id": "sim_001", "turn": 2, "role": "assistant", "tool_calls": [{"id": "functions.bash:0", "type": "function", "function": {"name": "bash", "arguments": "{\"cmd\": \"cat foo.py\"}"}}]}
{"conversation_id": "sim_001", "turn": 3, "role": "tool", "tool_call_id": "functions.bash:0", "content": "def foo():\n return 1/0", "tools": [...]}
Comment on lines +203 to +205
result = max(self.min_sample_count, self.n_samples_from_dataset)
logger.debug(
f"Sample count: {result} (multi-turn: issuing all {self.n_samples_from_dataset} client turns)"
Comment on lines +113 to 115
Extracts JSON documents from SSE stream and decodes them to chunk objects.
Silently ignores non-content SSE messages (role, finish_reason, etc).

Comment on lines +116 to +120
```jsonl
{"conversation_id": "sim_001", "turn": 1, "role": "user", "content": "Fix the bug in foo.py", "system": "You are a coding agent.", "tools": [...]}
{"conversation_id": "sim_001", "turn": 2, "role": "assistant", "tool_calls": [{"id": "functions.bash:0", "type": "function", "function": {"name": "bash", "arguments": "{\"cmd\": \"cat foo.py\"}"}}]}
{"conversation_id": "sim_001", "turn": 3, "role": "tool", "tool_call_id": "functions.bash:0", "content": "def foo():\n return 1/0", "tools": [...]}
{"conversation_id": "sim_001", "turn": 4, "role": "assistant", "content": "The bug is a ZeroDivisionError. Here is the fix: ..."}
tianmu-li and others added 2 commits May 7, 2026 17:20
Tool-call tokens were completely excluded from output sequence length,
TPOT, and TPS because they were only stored in QueryResult.metadata and
never reached TextModelOutput or EventRecord.data.

- Add `tool_calls` field to TextModelOutput; __str__ and
  text_after_first_chunk include JSON-encoded tool calls so the full
  generation is counted
- Add as_message_parts / as_message_parts_after_first_chunk helpers for
  chat-template-aware tokenization in the metrics pipeline
- OpenAI SSE accumulator populates tool_calls in TextModelOutput and
  emits a zero-length sentinel StreamChunk on the first tool-call delta
  so TTFT fires for agentic (content-free) responses
- Both OpenAI adapters (msgspec and pydantic) route tool_calls into
  TextModelOutput in addition to metadata
- TokenizePool gains token_count_message / token_count_message_async
  using apply_chat_template + baseline subtraction, with fallback to
  whitespace tokenization when the template raises
- OslTrigger and TpotTrigger override the new _extract_message hook to
  use the message tokenization path when tool_calls are present
- Forward `tools` key through MultiTurnDataset per-conversation defaults

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two bugs in TokenizePool._token_count_message_worker caused OSL / TPOT
to be inflated for every response containing tool_calls:

1. tool_calls[].function.arguments arrives as the OpenAI wire-format JSON
   string, but Hermes-style chat templates (Qwen3-Coder, etc.) iterate
   arguments as a mapping. Passing a string raises, and the code silently
   fell through to whitespace-splitting content + reasoning + json(tool_calls) —
   counting every JSON bracket, quote, and escape as its own token.
   Fixed by parsing arguments to dict before rendering.

2. apply_chat_template rejects assistant-only message lists on several
   templates ("No user query found in messages"). The render also raised,
   forcing the fallback path. Fixed by prepending an empty user message
   and subtracting its token length back out.

Also switched the render path from tokenize=True (which returns a single-
element [Encoding] in recent transformers, so len() was 1) to
tokenize=False followed by tokenizer.tokenize(rendered), matching how
_token_count_worker measures plain text.

Verified on a real Qwen3.6-35B-A3B response: a tool-calling turn that
previously reported 130 tokens now reports 100, matching the raw-bytes
reference of 102 (2-token delta is the template's <think>\\n scaffolding).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 7, 2026 23:16
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 50 out of 51 changed files in this pull request and generated 5 comments.

Comment on lines +206 to +211
if prompt_text is None and "messages" in data:
parts: list[str] = [
m["content"]
for m in data["messages"]
if isinstance(m, dict) and m.get("content")
]
tool_calls_json = (
msgspec.json.encode(list(tool_calls)).decode() if tool_calls else ""
)
fallback_text = (content or "") + (reasoning or "") + tool_calls_json
Comment on lines +109 to 115
def parse_sse_chunk(cls, buffer: bytes, end_pos: int) -> list[Any]:
"""
Parse SSE chunk and extract all content strings.
Parse SSE chunk and extract all chunk objects.

Extracts JSON documents from SSE stream and decodes them to content strings.
Extracts JSON documents from SSE stream and decodes them to chunk objects.
Silently ignores non-content SSE messages (role, finish_reason, etc).

Comment on lines +342 to +343
from the adapter are applied via AddDefaultColumns (fill-missing-only) so that
per-row dataset overrides are preserved.
Comment thread docs/MULTI_TURN_QUICKSTART.md Outdated
Comment on lines +138 to +139
This catches missing required fields, invalid role sequences, non-consecutive turn numbers, and
interleaved conversations — all errors that would otherwise surface at benchmark startup.
tianmu-li and others added 2 commits May 8, 2026 01:25
- QUICKSTART: validate_jsonl_schema.py only does per-row JSON Schema
  checks; cross-row invariants (role sequences, turn numbering, grouping)
  are enforced by MultiTurnDataset at load time, not the script
- README: collapse single/merged tool rows into unified tool_results form
  to match what MultiTurnDataset._validate_conversation_structure enforces
- multi_turn_dataset.py: fix docstring referencing removed AddDefaultColumns

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add _precompute_isl_for_multi_turn() in execute.py: runs
  apply_chat_template(messages, tokenize=True, add_generation_prompt=True)
  once per client turn at setup time and stores results in
  sample["input_tokens"], hitting the IslTrigger sync fast path
  (len(token_ids)) with zero hot-path cost.
- Add _extract_prompt_text() in session.py: refactors inline message
  content extraction to handle list-form multimodal content safely,
  fixing a crash when content is a list (e.g. vision/tool-call messages).
- Add unit tests for both helpers and two integration tests covering
  target_concurrency cap enforcement and pipeline exception propagation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 8, 2026 05:53
Qwen3's fast tokenizer returns a BatchEncoding object from
apply_chat_template(tokenize=True) instead of a plain list[int].
Storing the BatchEncoding in sample["input_tokens"] caused a msgspec
serialization error at benchmark setup time. Extract .input_ids when
the return value has that attribute; fall back to the plain list otherwise.

Add a regression test using a mock BatchEncoding so this is caught
before it can regress again.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 52 out of 53 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

src/inference_endpoint/endpoint_client/adapter_protocol.py:134

  • parse_sse_chunk() wraps the entire decode loop in a single try/except. If decoding one SSE JSON doc fails (e.g., because of an unmodeled field like OpenAI’s initial delta.role), the method drops all subsequent docs in the same buffer. Catch exceptions per json_doc instead, and consider filtering out None returns so workers don’t forward no-op chunks to accumulators.
        json_docs = cls.SSE_DATA_PATTERN.findall(buffer[:end_pos])
        parsed_contents = []

        try:
            for json_doc in json_docs:
                content = cls.decode_sse_message(json_doc)
                parsed_contents.append(content)
        except Exception:
            # Normal for non-content SSE messages (role, finish_reason, etc)
            pass

        return parsed_contents

Comment on lines 45 to +56
class SSEDelta(msgspec.Struct, frozen=True, kw_only=True, omit_defaults=True, gc=False): # type: ignore[call-arg]
"""SSE delta object containing content."""
"""SSE delta object containing content.

content: str = ""
reasoning: str = ""
AT-RISK (gc=False): Has mutable container field `tool_calls`. Any change that
mutates `tool_calls` after construction or stores cyclic references in it
must be audited; if so, remove gc=False.
"""

content: str | None = None
reasoning_content: str | None = None # SGLang / DeepSeek field name
reasoning: str | None = None # vLLM field name
tool_calls: list[dict[str, Any]] | None = None
### Validation Rules

1. All rows for a given `conversation_id` must appear **consecutively** in the file (no interleaving
with rows from other conversations). Turns within a conversation must be in order.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants